Google Cloud Tasksを完全に理解する
Google Cloudデータエンジニアのはんざわです。
つい最近、Cloud Tasksを触ってみる機会があり、そこで学んだことや実際に動かしてみた内容を共有したいと思います。
今回の記事では、Cloud Tasksの概要を整理し、単一のCloud FunctionsをCloud Tasksで起動するまでの流れを実際に構築してみたいと思います。
Cloud Tasksの概要
Cloud Tasksは、大量の分散タスクの実行、ディスパッチ、配信を管理できるフルマネージドサービスです。
Cloud Tasksを使用すると非同期処理を簡単に実装でき、実行のタイミングや回数などを柔軟に制御することが可能です。
公式ドキュメントで紹介されている通り、主なユースケースとしては、次のようなものがあります。
- データベースの更新のようなバックグラウンド処理に時間がかかる可能性のある処理をワーカーに委譲することで、ユーザの応答時間を短縮する
- 予期せぬ本番インシデントが発生した場合にリクエストを維持する
- メインユーザフローから非ユーザ向けタスクを削除することでトラフィックの急増をスムーズにする
- サードパーティのAPIコールレートを管理する
もう少し詳しく
Cloud Tasksには、タスクとキューが存在し、それぞれ異なる役割を持っています。
タスクとキューは以下の画像のような構成になっています。
処理の流れは以下の通りです。
- 「メールを送信する」や「データを加工する」などの実行したい処理を呼び出すリクエストをタスクとして登録する
- タスクはキューに追加され、正常に実行されるまでタスクは保持される
- キューはそのタスクをユーザーが定義したレートで「メールを送信する」や「データを加工する」などの実行したい処理をするターゲットにリクエストを送信する
- キューが正常なHTTPレスポンスコード(200〜299)を受け取ったらそのまま終了し、別のレスポンスが送信されたり、レスポンスがない場合はタスクが再試行される
続いて、タスクとキューについてもう少し詳しく見てみたいと思います。
タスクについて
タスクは、 「メールを送信する」や「データを加工する」などの実行したい処理を呼び出すリクエストに該当します。
タスクで定義したリクエストの送信先は、HTTP/Sエンドポイント と App Engineアプリケーション の2つがあります。
HTTP/SエンドポイントであればCloud Functions、Cloud Run、GKE、Compute Engine、またはオンプレミスのウェブサーバーなどにリクエストを送信することが可能です。
これらのリクエストを受け取り、処理を実行するエンドポイントを ワーカー と呼びます。
HTTP/Sのタスクの場合、すべてのワーカーは、デフォルトのタイムアウトが10分で最大でも30分以内にHTTPレスポンスコード(200~299)を送信する必要があります。別のレスポンスが送信されたり、レスポンスがない場合はタスクが再試行されてしまうので注意しましょう。
キューについて
キューは、タスクが追加されるとユーザーが定義したレートでワーカーにリクエストを送信し、確実に処理されるようにします。
ワーカに送信するリクエストのレートに関しては、後ほどCloud Tasksを準備するパートで紹介します。
実際に動かしてみる
Cloud Functionsの準備
まずは呼び出されるCloud Functionsをデプロイします。
設定内容は次の通りです。その他の項目は全てデフォルトの設定を使用しています。
- 名前:
exec-by-tasks
- リージョン:
asia-northeast1
Cloud Functionsをセキュアーに呼び出すためにトリガーのタイプをHTTPS
に設定し、認証は認証が必要
に設定しました。
また、ソースコードは次のようなシンプルなものを用意しました。
import functions_framework @functions_framework.http def hello_http(request): request_json = request.get_json() name = request_json['name'] print(name) return 'OK'
サービスアカウントの準備
次にCloud TasksがCloud Functionsを呼び出す際に使用するサービスアカウントの準備をします。
今回の検証では、Cloud Functionsを認証が必要なHTTPSトリガーに設定しているため、HTTPリクエストに認証情報を含める必要があります。
一般的にCloud FunctionsやCloud Runにリクエストを送信するときは、OpenID Connect(OIDC)を使用して認証します。
つまり、タスクを実行するサービスアカウントがOIDCから認証情報を取得できるように適切な権限を付与する必要があります。
一見難しそうに見えますが、実際にはとても簡単です。
まずは、OIDCから認証情報を取得するサービスアカウントを作成します。名前は次の通りです。
- 名前:
cloud-tasks
さらに、このサービスアカウントが事前にデプロイした第2世代のCloud Functionsを実行するためにサービス単位でroles/run.invoker(Cloud Run 起動元)
の権限を割り当てます。
これでCloud TasksからCloud FunctionsにHTTPリクエストを送信する際、OIDCで取得した認証情報を含められるようになりました。
Cloud Tasksのキューの準備
次にCloud Tasksのキューを作成します。
名前とリージョンは次の通りです。
- 名前:
exec-cloud-functions
- リージョン:
asia-northeast1
キューの詳細設定は、以下のキャプチャの通りです。
ディスパッチとは、キューのタスクをワーカーに送信することを指します。
つまり、最大ディスパッチ数は、1秒間にキューからタスクをワーカーに送信する最大レート数を指します。
同様に最大同時ディスパッチ数は、Cloud Tasksがキューからタスクをワーカーに送信することを許可する最大数に該当します。
これらのパラメータを適切に設定することでAPIコールレートなどを管理し、バックエンドシステムへの負荷の調整が可能になります。
また、各パラメータの詳細は以下のドキュメントで確認してください。
タスクを作成してCloud Functionsを実行してみる
次に作成したキューからCREATE HTTP TASK
を選択し、タスクを作成します。
入力内容は以下のキャプチャの通りです。
注意点として、Authヘッダー
にOIDC トークンを追加
を選択し、事前に作成したサービスアカウントを割り当てるようにしましょう。
しかし、Cloud Functionsのログを確認すると415エラーが発生していました。。。
どうやら明示的にContent-Type
のヘッダー情報をリクエストに含める必要があるようです。
最終的に以下のキャプチャのようなHTTPタスクを作成しました。
変更点として、ヘッダーにcontent-type
のキーとapplication/json
のバリューを追記しました。
上記の構成で無事に実行することができました!
まとめ
今回はCloud Tasksの概要を整理しながらCloud Functionsを呼び出してみました。
次回以降のブログでは、並列に多数のタスクを実行させてみたり、Pub/Subと比較してみたりしたいと思います。